import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.LinkedList;
import java.util.List;

public class StateExampleJ {
    public static void main(String[] args) throws Exception
    {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        final ParameterTool parameters = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);



        env.setStateBackend(new RocksDBStateBackend("hdfs://Desktop:9000/flink/checkpoints",true));
//        .setStateBackend(new RocksDBStateBackend("oss://bigdata/xxx/order-state"));


        List<Order> data = new LinkedList<>();
        for (long i = 1; i <= 25; i++)
            data.add(new Order(i, i % 7, i % 3, i + 0.1));
        //一些胡乱加入的数据
        //如果i被7整除,那么货款都属于同一家店铺的营收
        //如果i被3余后得到相同数据,表明交易的是同一种商品

            DataStream<Order> dataStream = env.fromCollection(data).setParallelism(1).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.milliseconds(1)) {
            @Override
            public long extractTimestamp(Order element)
            {
                return element.finishTime;
            }
        });



        dataStream.keyBy(o -> o.memberId).map(new RichMapFunction1()).print();

//        输出的结果是各种货物的销售额(不断更新,输出的结果中包含老数据和新数据)


        env.execute();
    }
}


/*
代码来自:
https://zhuanlan.zhihu.com/p/182467386
 */